﻿namespace DotNetCommon
{
    using System;
    using System.Collections.Concurrent;
    using System.Runtime.Serialization;
    using System.Threading;
    using System.Threading.Tasks;

    /// <summary>
    /// 使用并行编程技术 (<c>TPL</c>) 实现的一个消息队列模型(<c>生产者/消费者</c>), 基于 <c>System.Collections.Concurrent.BlockingCollection&lt;T></c>
    /// <example>
    /// 下面是封装的非阻塞日志: 
    /// <code>
    /// public class Logger<para/>
    /// {<para/>
    ///    private static readonly ProducerConsumerQueue&lt;string> logQueue = new ProducerConsumerQueue&lt;string>(msg =><para/>
    ///     {<para/>
    ///         File.AppendAllText("d:/2020-10-10.log", msg);<para/>
    ///     }, 1, 10000);<para/>
    ///     public static void Log(string msg)<para/>
    ///     {<para/>
    ///         logQueue.Add(msg);<para/>
    ///     }<para/>
    /// }<para/>
    /// </code>
    /// </example>
    /// </summary>
    /// <typeparam name="T">消息队列中存储的模型</typeparam>
    public sealed class ProducerConsumerQueue<T> : IDisposable
    {
        private readonly BlockingCollection<T> _queue;

        /// <summary>
        /// 根据指定的消费者逻辑、最大并发数,创建一个不限消息数量的队列模型
        /// </summary>
        /// <param name="consumer">消费者逻辑</param>
        /// <param name="maxConcurrencyLevel">最多有多少个消费者</param>
        public ProducerConsumerQueue(Action<T> consumer, uint maxConcurrencyLevel)
            : this(consumer, maxConcurrencyLevel, -1) { }

        /// <summary>
        /// 根据指定的消费者逻辑、最大并发数以及最大的消息数量,创建一个队列模型
        /// </summary>
        /// <param name="consumer">消费者逻辑</param>
        /// <param name="maxConcurrencyLevel">最多有多少个消费者</param>
        /// <param name="boundedCapacity">最多存储多少条消息</param>
        public ProducerConsumerQueue(Action<T> consumer, uint maxConcurrencyLevel, uint boundedCapacity)
            : this(consumer, maxConcurrencyLevel, (int)boundedCapacity) { }

        private ProducerConsumerQueue(Action<T> consumer, uint maxConcurrencyLevel, int boundedCapacity)
        {
            Ensure.NotNull(consumer, nameof(consumer));
            Ensure.That(maxConcurrencyLevel > 0, $"{nameof(maxConcurrencyLevel)} should be greater than zero.");
            Ensure.That(boundedCapacity != 0, $"{nameof(boundedCapacity)} should be greater than zero.");

            _queue = boundedCapacity < 0 ? new BlockingCollection<T>() : new BlockingCollection<T>(boundedCapacity);

            MaximumConcurrencyLevel = maxConcurrencyLevel;
            Completion = Configure(consumer);
        }

        /// <summary>
        /// 获取消费者的最大并发量
        /// </summary>
        public uint MaximumConcurrencyLevel { get; }

        /// <summary>
        /// 获取最多可存储的消息数量
        /// </summary>
        public int Capacity => _queue.BoundedCapacity;

        /// <summary>
        /// 获取当前等待消费的消息数量
        /// </summary>
        public uint PendingCount => (uint)_queue.Count;

        /// <summary>
        /// 获取当前等待消费的消息
        /// </summary>
        public T[] PendingItems => _queue.ToArray();

        /// <summary>
        /// 获取一个 <c>Task&lt;bool&gt;</c>,表示是否所有的消费者都已经工作完毕.
        /// </summary>
        /// <remarks>
        /// 只有当你调用<seealso cref="CompleteAdding"/>方法后,队列才会结束,消费者在消费完最后一个消息后也会随之结束,你获取到的 <c>Task</c>才会处于完成状态,否则,当你调用<c>Task&lt;bool&gt;.Result</c>时,将一直处于线程阻塞状态
        /// </remarks>
        public Task<bool> Completion { get; }

        /// <summary>
        /// 消费者逻辑出现错误时的事件
        /// </summary>
        public event EventHandler<ProducerConsumerQueueException> OnException;

        /// <summary>
        /// 添加消息到消息队列 <see cref="ProducerConsumerQueue{T}"/>. 
        /// 如果队列中的消息数量已达最大值,这个方法将处于阻塞状态,直到队列可以接受这个消息为止
        /// </summary>
        /// <param name="item">要添加的消息</param>
        public void Add(T item) => Add(item, CancellationToken.None);

        /// <summary>
        /// 添加消息到消息队列 <see cref="ProducerConsumerQueue{T}"/>. 
        /// 如果队列中的消息数量已达最大值,这个方法将处于阻塞状态,直到队列可以接受这个消息为止
        /// </summary>
        /// <param name="item">要添加的消息</param>
        /// <param name="cancellationToken">添加消息的时候观察cancellationToken是否已被取消,如果被取消,程序报异常</param>
        public void Add(T item, CancellationToken cancellationToken)
        {
            try
            {
                _queue.Add(item, cancellationToken);
            }
            catch (Exception e)
            {
                OnException?.Invoke(this, new ProducerConsumerQueueException("Exception occurred when adding item.", e)
                {
                    Type = ProducerConsumerQueueExceptionType.Add,
                    Model = item
                });
            }
        }

        /// <summary>
        /// 尝试将消息添加到队列<see cref="ProducerConsumerQueue{T}"/>.
        /// </summary>
        /// <param name="item">被添加到队列中的消息</param>
        /// <remarks>
        /// 可以在<seealso cref="OnException"/>事件上进行捕捉
        /// </remarks>
        /// <exception cref="ProducerConsumerQueueException">当消息添加失败时或向一个已完成的队列添加消息时</exception>
        /// <returns>
        /// 消息添加成功返回 <c>True</c> 否则返回 <c>False</c>
        /// </returns>
        public bool TryAdd(T item) => TryAdd(item, TimeSpan.Zero, CancellationToken.None);

        /// <summary>
        /// 尝试在指定的时间段内将消息添加到队列<see cref="ProducerConsumerQueue{T}"/>
        /// </summary>
        /// <param name="item">被添加到队列中的消息</param>
        /// <param name="timeout">超时时间</param>
        /// <exception cref="ProducerConsumerQueueException">当消息添加失败时或向一个已完成的队列添加消息时</exception>
        /// <remarks>
        /// 可以在<seealso cref="OnException"/>事件上进行捕捉
        /// </remarks>
        /// <returns>
        /// 消息添加成功返回 <c>True</c> 否则返回 <c>False</c>
        /// </returns>
        public bool TryAdd(T item, TimeSpan timeout) => TryAdd(item, timeout, CancellationToken.None);

        /// <summary>
        /// 尝试在指定的时间段内将消息添加到队列<see cref="ProducerConsumerQueue{T}"/>
        /// </summary>
        /// <param name="item">被添加到队列中的消息</param>
        /// <param name="timeout">超时时间</param>
        /// <param name="cancellationToken">添加消息的时候观察cancellationToken是否已被取消,如果被取消,直接返回false并抛异常</param>
        /// <exception cref="ProducerConsumerQueueException">当消息添加失败时或向一个已完成的队列添加消息时</exception>
        /// <remarks>
        /// 可以在<seealso cref="OnException"/>事件上进行捕捉
        /// </remarks>
        /// <returns>
        /// 消息添加成功返回 <c>True</c> 否则返回 <c>False</c>
        /// </returns>
        public bool TryAdd(T item, TimeSpan timeout, CancellationToken cancellationToken)
        {
            try
            {
                return _queue.TryAdd(item, (int)timeout.TotalMilliseconds, cancellationToken);
            }
            catch (Exception e)
            {
                OnException?.Invoke(this, new ProducerConsumerQueueException("Exception occurred when adding item.", e)
                {
                    Type = ProducerConsumerQueueExceptionType.Add,
                    Model = item
                });
                return false;
            }
        }

        /// <summary>
        /// 将队列<see cref="ProducerConsumerQueue{T}"/> 标记为完成,之后不能再向队列中添加消息
        /// </summary>
        public void CompleteAdding() => _queue.CompleteAdding();

        /// <summary>
        /// 释放资源
        /// </summary>
        public void Dispose() => _queue?.Dispose();

        private Task<bool> Configure(Action<T> consumer)
        {
            var workers = Task.Factory.StartNew(() =>
                {
                    Parallel.ForEach(
                        Partitioner.Create(
                            _queue.GetConsumingEnumerable(),
                            EnumerablePartitionerOptions.NoBuffering),
                        new ParallelOptions { MaxDegreeOfParallelism = (int)MaximumConcurrencyLevel },
                        WrapConsumer);
                },
                CancellationToken.None,
                TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach,
                TaskScheduler.Default);

            var tcs = new TaskCompletionSource<bool>();
            workers.ContinueWith(task => tcs.SetResult(false),
                TaskContinuationOptions.NotOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously);

            workers.ContinueWith(task => tcs.SetResult(true),
                TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously);

            return tcs.Task;

            void WrapConsumer(T x)
            {
                try
                {
                    consumer(x);
                }
                catch (Exception e)
                {
                    OnException?.Invoke(this, new ProducerConsumerQueueException("Exception occurred.", e)
                    {
                        Type = ProducerConsumerQueueExceptionType.Consume,
                        Model = x
                    });
                }
            }
        }
    }

    /// <summary>
    /// 异常类型
    /// </summary>
    public enum ProducerConsumerQueueExceptionType
    {
        /// <summary>
        /// 添加消息时
        /// </summary>
        Add,
        /// <summary>
        /// 消费消息时
        /// </summary>
        Consume
    }

    /// <summary>
    /// 表示<see cref="ProducerConsumerQueue{T}"/>抛出的异常
    /// </summary>
    [Serializable]
    public sealed class ProducerConsumerQueueException : Exception
    {
        /// <summary>
        /// 异常类型
        /// </summary>
        public ProducerConsumerQueueExceptionType Type { get; set; }

        /// <summary>
        /// 消息实体
        /// </summary>
        public object Model { get; set; }

        /// <summary>
        /// Creates an instance of the <see cref="ProducerConsumerQueueException"/>.
        /// </summary>
        internal ProducerConsumerQueueException() { }


        /// <summary>
        /// Creates an instance of the <see cref="ProducerConsumerQueueException"/>.
        /// </summary>
        /// <param name="message">The message for the <see cref="Exception"/></param>
        internal ProducerConsumerQueueException(string message) : base(message) { }

        /// <summary>
        /// Creates an instance of the <see cref="ProducerConsumerQueueException"/>.
        /// </summary>
        /// <param name="message">The message for the <see cref="Exception"/></param>
        /// <param name="innerException">The inner exception</param>
        internal ProducerConsumerQueueException(string message, Exception innerException) : base(message, innerException) { }

        /// <summary>
        /// Creates an instance of the <see cref="ProducerConsumerQueueException"/>.
        /// </summary>
        /// <param name="info">The serialization information</param>
        /// <param name="context">The streaming context</param>
        internal ProducerConsumerQueueException(SerializationInfo info, StreamingContext context) : base(info, context) { }
    }
}